1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.mockito.Matchers.isA;
20  import static org.mockito.Mockito.inOrder;
21  import static org.mockito.Mockito.mock;
22  import static org.mockito.Mockito.spy;
23  import static org.mockito.Mockito.times;
24  
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.NoSuchElementException;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.junit.Test;
32  import org.mockito.InOrder;
33  
34  import rx.Observable;
35  import rx.Observer;
36  import rx.Subscriber;
37  import rx.functions.Action1;
38  import rx.functions.Func1;
39  import rx.functions.Func2;
40  
41  public class OperatorSingleTest {
42  
43      @Test
44      public void testSingle() {
45          Observable<Integer> observable = Observable.just(1).single();
46  
47          @SuppressWarnings("unchecked")
48          Observer<Integer> observer = mock(Observer.class);
49          observable.subscribe(observer);
50  
51          InOrder inOrder = inOrder(observer);
52          inOrder.verify(observer, times(1)).onNext(1);
53          inOrder.verify(observer, times(1)).onCompleted();
54          inOrder.verifyNoMoreInteractions();
55      }
56  
57      @Test
58      public void testSingleWithTooManyElements() {
59          Observable<Integer> observable = Observable.just(1, 2).single();
60  
61          @SuppressWarnings("unchecked")
62          Observer<Integer> observer = mock(Observer.class);
63          observable.subscribe(observer);
64  
65          InOrder inOrder = inOrder(observer);
66          inOrder.verify(observer, times(1)).onError(
67                  isA(IllegalArgumentException.class));
68          inOrder.verifyNoMoreInteractions();
69      }
70  
71      @Test
72      public void testSingleWithEmpty() {
73          Observable<Integer> observable = Observable.<Integer> empty().single();
74  
75          @SuppressWarnings("unchecked")
76          Observer<Integer> observer = mock(Observer.class);
77          observable.subscribe(observer);
78  
79          InOrder inOrder = inOrder(observer);
80          inOrder.verify(observer, times(1)).onError(
81                  isA(NoSuchElementException.class));
82          inOrder.verifyNoMoreInteractions();
83      }
84      
85      @Test
86      public void testSingleDoesNotRequestMoreThanItNeedsToEmitItem() {
87          final AtomicLong request = new AtomicLong();
88          Observable.just(1).doOnRequest(new Action1<Long>() {
89              @Override
90              public void call(Long n) {
91                  request.addAndGet(n);
92              }
93          }).toBlocking().single();
94          assertEquals(2, request.get());
95      }
96  
97      @Test
98      public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromEmpty() {
99          final AtomicLong request = new AtomicLong();
100         try {
101             Observable.empty().doOnRequest(new Action1<Long>() {
102                 @Override
103                 public void call(Long n) {
104                     request.addAndGet(n);
105                 }
106             }).toBlocking().single();
107         } catch (NoSuchElementException e) {
108             assertEquals(2, request.get());
109         }
110     }
111 
112     @Test
113     public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromMoreThanOne() {
114         final AtomicLong request = new AtomicLong();
115         try {
116             Observable.just(1, 2).doOnRequest(new Action1<Long>() {
117                 @Override
118                 public void call(Long n) {
119                     request.addAndGet(n);
120                 }
121             }).toBlocking().single();
122         } catch (IllegalArgumentException e) {
123             assertEquals(2, request.get());
124         }
125     }
126     
127     @Test
128     public void testSingleDoesNotRequestMoreThanItNeedsIf1Then2Requested() {
129         final List<Long> requests = new ArrayList<Long>();
130         Observable.just(1)
131         //
132                 .doOnRequest(new Action1<Long>() {
133                     @Override
134                     public void call(Long n) {
135                         requests.add(n);
136                     }
137                 })
138                 //
139                 .single()
140                 //
141                 .subscribe(new Subscriber<Integer>() {
142 
143                     @Override
144                     public void onStart() {
145                         request(1);
146                     }
147 
148                     @Override
149                     public void onCompleted() {
150 
151                     }
152 
153                     @Override
154                     public void onError(Throwable e) {
155 
156                     }
157 
158                     @Override
159                     public void onNext(Integer t) {
160                         request(2);
161                     }
162                 });
163         assertEquals(Arrays.asList(2L), requests);
164     }
165     
166     @Test
167     public void testSingleDoesNotRequestMoreThanItNeedsIf3Requested() {
168         final List<Long> requests = new ArrayList<Long>();
169         Observable.just(1)
170         //
171                 .doOnRequest(new Action1<Long>() {
172                     @Override
173                     public void call(Long n) {
174                         requests.add(n);
175                     }
176                 })
177                 //
178                 .single()
179                 //
180                 .subscribe(new Subscriber<Integer>() {
181 
182                     @Override
183                     public void onStart() {
184                         request(3);
185                     }
186 
187                     @Override
188                     public void onCompleted() {
189 
190                     }
191 
192                     @Override
193                     public void onError(Throwable e) {
194 
195                     }
196 
197                     @Override
198                     public void onNext(Integer t) {
199                     }
200                 });
201         assertEquals(Arrays.asList(2L), requests);
202     }
203     
204     @Test
205     public void testSingleRequestsExactlyWhatItNeedsIf1Requested() {
206         final List<Long> requests = new ArrayList<Long>();
207         Observable.just(1)
208         //
209                 .doOnRequest(new Action1<Long>() {
210                     @Override
211                     public void call(Long n) {
212                         requests.add(n);
213                     }
214                 })
215                 //
216                 .single()
217                 //
218                 .subscribe(new Subscriber<Integer>() {
219 
220                     @Override
221                     public void onStart() {
222                         request(1);
223                     }
224 
225                     @Override
226                     public void onCompleted() {
227 
228                     }
229 
230                     @Override
231                     public void onError(Throwable e) {
232 
233                     }
234 
235                     @Override
236                     public void onNext(Integer t) {
237                     }
238                 });
239         assertEquals(Arrays.asList(2L), requests);
240     }
241 
242 
243     @Test
244     public void testSingleWithPredicate() {
245         Observable<Integer> observable = Observable.just(1, 2).single(
246                 new Func1<Integer, Boolean>() {
247 
248                     @Override
249                     public Boolean call(Integer t1) {
250                         return t1 % 2 == 0;
251                     }
252                 });
253 
254         @SuppressWarnings("unchecked")
255         Observer<Integer> observer = mock(Observer.class);
256         observable.subscribe(observer);
257 
258         InOrder inOrder = inOrder(observer);
259         inOrder.verify(observer, times(1)).onNext(2);
260         inOrder.verify(observer, times(1)).onCompleted();
261         inOrder.verifyNoMoreInteractions();
262     }
263 
264     @Test
265     public void testSingleWithPredicateAndTooManyElements() {
266         Observable<Integer> observable = Observable.just(1, 2, 3, 4).single(
267                 new Func1<Integer, Boolean>() {
268 
269                     @Override
270                     public Boolean call(Integer t1) {
271                         return t1 % 2 == 0;
272                     }
273                 });
274 
275         @SuppressWarnings("unchecked")
276         Observer<Integer> observer = mock(Observer.class);
277         observable.subscribe(observer);
278 
279         InOrder inOrder = inOrder(observer);
280         inOrder.verify(observer, times(1)).onError(
281                 isA(IllegalArgumentException.class));
282         inOrder.verifyNoMoreInteractions();
283     }
284 
285     @Test
286     public void testSingleWithPredicateAndEmpty() {
287         Observable<Integer> observable = Observable.just(1).single(
288                 new Func1<Integer, Boolean>() {
289 
290                     @Override
291                     public Boolean call(Integer t1) {
292                         return t1 % 2 == 0;
293                     }
294                 });
295         @SuppressWarnings("unchecked")
296         Observer<Integer> observer = mock(Observer.class);
297         observable.subscribe(observer);
298 
299         InOrder inOrder = inOrder(observer);
300         inOrder.verify(observer, times(1)).onError(
301                 isA(NoSuchElementException.class));
302         inOrder.verifyNoMoreInteractions();
303     }
304 
305     @Test
306     public void testSingleOrDefault() {
307         Observable<Integer> observable = Observable.just(1).singleOrDefault(2);
308 
309         @SuppressWarnings("unchecked")
310         Observer<Integer> observer = mock(Observer.class);
311         observable.subscribe(observer);
312 
313         InOrder inOrder = inOrder(observer);
314         inOrder.verify(observer, times(1)).onNext(1);
315         inOrder.verify(observer, times(1)).onCompleted();
316         inOrder.verifyNoMoreInteractions();
317     }
318 
319     @Test
320     public void testSingleOrDefaultWithTooManyElements() {
321         Observable<Integer> observable = Observable.just(1, 2).singleOrDefault(
322                 3);
323 
324         @SuppressWarnings("unchecked")
325         Observer<Integer> observer = mock(Observer.class);
326         observable.subscribe(observer);
327 
328         InOrder inOrder = inOrder(observer);
329         inOrder.verify(observer, times(1)).onError(
330                 isA(IllegalArgumentException.class));
331         inOrder.verifyNoMoreInteractions();
332     }
333 
334     @Test
335     public void testSingleOrDefaultWithEmpty() {
336         Observable<Integer> observable = Observable.<Integer> empty()
337                 .singleOrDefault(1);
338 
339         @SuppressWarnings("unchecked")
340         Observer<Integer> observer = mock(Observer.class);
341         observable.subscribe(observer);
342 
343         InOrder inOrder = inOrder(observer);
344         inOrder.verify(observer, times(1)).onNext(1);
345         inOrder.verify(observer, times(1)).onCompleted();
346         inOrder.verifyNoMoreInteractions();
347     }
348 
349     @Test
350     public void testSingleOrDefaultWithPredicate() {
351         Observable<Integer> observable = Observable.just(1, 2).singleOrDefault(
352                 4, new Func1<Integer, Boolean>() {
353 
354                     @Override
355                     public Boolean call(Integer t1) {
356                         return t1 % 2 == 0;
357                     }
358                 });
359 
360         @SuppressWarnings("unchecked")
361         Observer<Integer> observer = mock(Observer.class);
362         observable.subscribe(observer);
363 
364         InOrder inOrder = inOrder(observer);
365         inOrder.verify(observer, times(1)).onNext(2);
366         inOrder.verify(observer, times(1)).onCompleted();
367         inOrder.verifyNoMoreInteractions();
368     }
369 
370     @Test
371     public void testSingleOrDefaultWithPredicateAndTooManyElements() {
372         Observable<Integer> observable = Observable.just(1, 2, 3, 4)
373                 .singleOrDefault(6, new Func1<Integer, Boolean>() {
374 
375                     @Override
376                     public Boolean call(Integer t1) {
377                         return t1 % 2 == 0;
378                     }
379                 });
380 
381         @SuppressWarnings("unchecked")
382         Observer<Integer> observer = mock(Observer.class);
383         observable.subscribe(observer);
384 
385         InOrder inOrder = inOrder(observer);
386         inOrder.verify(observer, times(1)).onError(
387                 isA(IllegalArgumentException.class));
388         inOrder.verifyNoMoreInteractions();
389     }
390 
391     @Test
392     public void testSingleOrDefaultWithPredicateAndEmpty() {
393         Observable<Integer> observable = Observable.just(1).singleOrDefault(2,
394                 new Func1<Integer, Boolean>() {
395 
396                     @Override
397                     public Boolean call(Integer t1) {
398                         return t1 % 2 == 0;
399                     }
400                 });
401 
402         @SuppressWarnings("unchecked")
403         Observer<Integer> observer = mock(Observer.class);
404         observable.subscribe(observer);
405 
406         InOrder inOrder = inOrder(observer);
407         inOrder.verify(observer, times(1)).onNext(2);
408         inOrder.verify(observer, times(1)).onCompleted();
409         inOrder.verifyNoMoreInteractions();
410     }
411 
412     @Test
413     public void testSingleWithBackpressure() {
414         Observable<Integer> observable = Observable.just(1, 2).single();
415 
416         Subscriber<Integer> subscriber = spy(new Subscriber<Integer>() {
417 
418             @Override
419             public void onStart() {
420                 request(1);
421             }
422 
423             @Override
424             public void onCompleted() {
425 
426             }
427 
428             @Override
429             public void onError(Throwable e) {
430 
431             }
432 
433             @Override
434             public void onNext(Integer integer) {
435                 request(1);
436             }
437         });
438         observable.subscribe(subscriber);
439 
440         InOrder inOrder = inOrder(subscriber);
441         inOrder.verify(subscriber, times(1)).onError(isA(IllegalArgumentException.class));
442         inOrder.verifyNoMoreInteractions();
443     }
444 
445     @Test(timeout = 30000)
446     public void testIssue1527() throws InterruptedException {
447         //https://github.com/ReactiveX/RxJava/pull/1527
448         Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
449         Observable<Integer> reduced = source.reduce(new Func2<Integer, Integer, Integer>() {
450             @Override
451             public Integer call(Integer i1, Integer i2) {
452                 return i1 + i2;
453             }
454         });
455 
456         Integer r = reduced.toBlocking().first();
457         assertEquals(21, r.intValue());
458     }
459 }